{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Chapter 3 - Learning PySpark\n",
    "## Resilient Distributed Datasets"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Creating RDDs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "There are two ways to create an RDD in PySpark. You can parallelize a list"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "data = sc.parallelize(\n",
    "    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), \n",
    "     ('Amber', 9)])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "or read from a repository (a file or a database)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "data_from_file = sc.\\\n",
    "    textFile(\n",
    "        '/Users/drabast/Documents/PySpark_Data/VS14MORT.txt.gz', \n",
    "        4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note, that to execute the code above you will have to change the path where the data is stored. The dataset can be downloaded from http://tomdrabas.com/data/VS14MORT.txt.gz"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Schema"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "RDDs are *schema-less* data structures."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]]).collect()\n",
    "data_heterogenous"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can access the data in the object as you would normally do in Python."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "100000"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_heterogenous[1]['Porsche']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reading from files"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When you read from a text file, each row from the file forms an element of an RDD."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_from_file.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### User defined functions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can create *longer* method to transform your data instead of using the `lambda` expression."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def extractInformation(row):\n",
    "    import re\n",
    "    import numpy as np\n",
    "\n",
    "    selected_indices = [\n",
    "         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,\n",
    "         19,21,22,23,24,25,27,28,29,30,32,33,34,\n",
    "         36,37,38,39,40,41,42,43,44,45,46,47,48,\n",
    "         49,50,51,52,53,54,55,56,58,60,61,62,63,\n",
    "         64,65,66,67,68,69,70,71,72,73,74,75,76,\n",
    "         77,78,79,81,82,83,84,85,87,89\n",
    "    ]\n",
    "\n",
    "    '''\n",
    "        Input record schema\n",
    "        schema: n-m (o) -- xxx\n",
    "            n - position from\n",
    "            m - position to\n",
    "            o - number of characters\n",
    "            xxx - description\n",
    "        1. 1-19 (19) -- reserved positions\n",
    "        2. 20 (1) -- resident status\n",
    "        3. 21-60 (40) -- reserved positions\n",
    "        4. 61-62 (2) -- education code (1989 revision)\n",
    "        5. 63 (1) -- education code (2003 revision)\n",
    "        6. 64 (1) -- education reporting flag\n",
    "        7. 65-66 (2) -- month of death\n",
    "        8. 67-68 (2) -- reserved positions\n",
    "        9. 69 (1) -- sex\n",
    "        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated\n",
    "        11. 71-73 (3) -- number of units (years, months etc)\n",
    "        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)\n",
    "        13. 75-76 (2) -- age recoded into 52 categories\n",
    "        14. 77-78 (2) -- age recoded into 27 categories\n",
    "        15. 79-80 (2) -- age recoded into 12 categories\n",
    "        16. 81-82 (2) -- infant age recoded into 22 categories\n",
    "        17. 83 (1) -- place of death\n",
    "        18. 84 (1) -- marital status\n",
    "        19. 85 (1) -- day of the week of death\n",
    "        20. 86-101 (16) -- reserved positions\n",
    "        21. 102-105 (4) -- current year\n",
    "        22. 106 (1) -- injury at work\n",
    "        23. 107 (1) -- manner of death\n",
    "        24. 108 (1) -- manner of disposition\n",
    "        25. 109 (1) -- autopsy\n",
    "        26. 110-143 (34) -- reserved positions\n",
    "        27. 144 (1) -- activity code\n",
    "        28. 145 (1) -- place of injury\n",
    "        29. 146-149 (4) -- ICD code\n",
    "        30. 150-152 (3) -- 358 cause recode\n",
    "        31. 153 (1) -- reserved position\n",
    "        32. 154-156 (3) -- 113 cause recode\n",
    "        33. 157-159 (3) -- 130 infant cause recode\n",
    "        34. 160-161 (2) -- 39 cause recode\n",
    "        35. 162 (1) -- reserved position\n",
    "        36. 163-164 (2) -- number of entity-axis conditions\n",
    "        37-56. 165-304 (140) -- list of up to 20 conditions\n",
    "        57. 305-340 (36) -- reserved positions\n",
    "        58. 341-342 (2) -- number of record axis conditions\n",
    "        59. 343 (1) -- reserved position\n",
    "        60-79. 344-443 (100) -- record axis conditions\n",
    "        80. 444 (1) -- reserve position\n",
    "        81. 445-446 (2) -- race\n",
    "        82. 447 (1) -- bridged race flag\n",
    "        83. 448 (1) -- race imputation flag\n",
    "        84. 449 (1) -- race recode (3 categories)\n",
    "        85. 450 (1) -- race recode (5 categories)\n",
    "        86. 461-483 (33) -- reserved positions\n",
    "        87. 484-486 (3) -- Hispanic origin\n",
    "        88. 487 (1) -- reserved\n",
    "        89. 488 (1) -- Hispanic origin/race recode\n",
    "     '''\n",
    "\n",
    "    record_split = re\\\n",
    "        .compile(\n",
    "            r'([\\s]{19})([0-9]{1})([\\s]{40})([0-9\\s]{2})([0-9\\s]{1})([0-9]{1})([0-9]{2})' + \n",
    "            r'([\\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\\s]{1})([0-9]{2})([0-9]{2})' + \n",
    "            r'([0-9]{2})([0-9\\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\\s]{16})([0-9]{4})' +\n",
    "            r'([YNU]{1})([0-9\\s]{1})([BCOU]{1})([YNU]{1})([\\s]{34})([0-9\\s]{1})([0-9\\s]{1})' +\n",
    "            r'([A-Z0-9\\s]{4})([0-9]{3})([\\s]{1})([0-9\\s]{3})([0-9\\s]{3})([0-9\\s]{2})([\\s]{1})' + \n",
    "            r'([0-9\\s]{2})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})' + \n",
    "            r'([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})' + \n",
    "            r'([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})' + \n",
    "            r'([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})([A-Z0-9\\s]{7})' + \n",
    "            r'([A-Z0-9\\s]{7})([\\s]{36})([A-Z0-9\\s]{2})([\\s]{1})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})' + \n",
    "            r'([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})' + \n",
    "            r'([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})' + \n",
    "            r'([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})' + \n",
    "            r'([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([A-Z0-9\\s]{5})([\\s]{1})([0-9\\s]{2})([0-9\\s]{1})' + \n",
    "            r'([0-9\\s]{1})([0-9\\s]{1})([0-9\\s]{1})([\\s]{33})([0-9\\s]{3})([0-9\\s]{1})([0-9\\s]{1})')\n",
    "    try:\n",
    "        rs = np.array(record_split.split(row))[selected_indices]\n",
    "    except:\n",
    "        rs = np.array(['-99'] * len(selected_indices))\n",
    "    return rs\n",
    "#     return record_split.split(row)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, instead of using `lambda` we will use the `extractInformation(...)` method to split and convert our dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',\n",
       "        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',\n",
       "        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '01',\n",
       "        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',\n",
       "        ' ', '1', '1', '100', '6'], \n",
       "       dtype='<U40')]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_from_file_conv = data_from_file.map(extractInformation)\n",
    "data_from_file_conv.map(lambda row: row).take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Transformations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .map(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The method is applied to each element of the RDD: in the case for the `data_from_file_conv` dataset you can think of this as a transformation of each row."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_2014 = data_from_file_conv.map(lambda row: int(row[16]))\n",
    "data_2014.take(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can combine more columns."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('2014', 2014),\n",
       " ('-99', -99)]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))\n",
    "data_2014_2.take(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .filter(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `.filter(...)` method allows you to select elements of your dataset that fit specified criteria."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "6"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')\n",
    "data_filtered.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .flatMap(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `.flatMap(...)` method works similarly to `.map(...)` but returns a flattened results instead of a list. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))\n",
    "data_2014_flat.take(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .distinct()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This method returns a list of distinct values in a specified column."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['-99', 'M', 'F']"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()\n",
    "distinct_gender"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .sample(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `.sample()` method returns a randomized sample from the dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[array(['1', '  ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',\n",
       "        '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',\n",
       "        '215', '063', '   ', '21', '02', '11I350 ', '21I251 ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '02',\n",
       "        'I251 ', 'I350 ', '     ', '     ', '     ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '28', ' ',\n",
       "        ' ', '2', '4', '100', '8'], \n",
       "       dtype='<U40')]"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fraction = 0.1\n",
    "data_sample = data_from_file_conv.sample(False, fraction, 666)\n",
    "\n",
    "data_sample.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's confirm that we really got 10% of all the records."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Original dataset: 2631171, sample: 263247\n"
     ]
    }
   ],
   "source": [
    "print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .leftOuterJoin(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Left outer join, just like the SQL world, joins two RDDs based on the values found in both datasets, and returns records from the left RDD with records from the right one appended where the two RDDs match."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('c', (10, None)), ('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])\n",
    "rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])\n",
    "\n",
    "rdd3 = rdd1.leftOuterJoin(rdd2)\n",
    "rdd3.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If we used `.join(...)` method instead we would have gotten only the values for `'a'` and `'b'` as these two values intersect between these two RDDs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd4 = rdd1.join(rdd2)\n",
    "rdd4.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Another useful method is the `.intersection(...)` that returns the records that are *equal* in both RDDs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('a', 1)]"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd5 = rdd1.intersection(rdd2)\n",
    "rdd5.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .repartition(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Repartitioning the dataset changes the number of partitions the dataset is divided into."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "4"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd1 = rdd1.repartition(4)\n",
    "\n",
    "len(rdd1.glom().collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Actions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .take(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The method returns `n` top rows from a single data partition."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',\n",
       "        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',\n",
       "        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '01',\n",
       "        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',\n",
       "        ' ', '1', '1', '100', '6'], \n",
       "       dtype='<U40')]"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_first = data_from_file_conv.take(1)\n",
    "data_first"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you want somewhat randomized records you can use `.takeSample(...)` instead."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[array(['2', '17', ' ', '0', '08', 'M', '1', '069', ' ', '39', '19', '09',\n",
       "        '  ', '1', 'M', '7', '2014', 'U', '7', 'U', 'N', ' ', ' ', 'I251',\n",
       "        '215', '063', '   ', '21', '06', '11I500 ', '21I251 ', '61I499 ',\n",
       "        '62I10  ', '63N189 ', '64K761 ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '       ',\n",
       "        '       ', '       ', '       ', '       ', '       ', '05',\n",
       "        'I251 ', 'I120 ', 'I499 ', 'I500 ', 'K761 ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '     ',\n",
       "        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',\n",
       "        ' ', '1', '1', '100', '6'], \n",
       "       dtype='<U40')]"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)\n",
    "data_take_sampled"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .reduce(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Another action that processes your data, the `.reduce(...)` method *reduces* the elements of an RDD using a specified method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "15"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If the reducing function is not associative and commutative you will sometimes get wrong results depending how your data is partitioned."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "I we were to reduce the data in a manner that we would like to *divide* the current result by the subsequent one, we would expect a value of 10"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "10.0"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "works = data_reduce.reduce(lambda x, y: x / y)\n",
    "works"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "However, if you were to partition the data into 3 partitions, the result will be wrong."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.004"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)\n",
    "data_reduce.reduce(lambda x, y: x / y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `.reduceByKey(...)` method works in a similar way to the `.reduce(...)` method but performs a reduction on a key-by-key basis."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('b', 4), ('c', 2), ('a', 12), ('d', 5)]"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)\n",
    "data_key.reduceByKey(lambda x, y: x + y).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `.count()` method counts the number of elements in the RDD."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "6"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_reduce.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It has the same effect as the method below but does not require shifting the data to the driver."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "6"
      ]
     },
     "execution_count": 27,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(data_reduce.collect()) # WRONG -- DON'T DO THIS!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If your dataset is in a form of a *key-value* you can use the `.countByKey()` method to get the counts of distinct keys."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "dict_items([('a', 2), ('b', 2), ('d', 2), ('c', 1)])"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data_key.countByKey().items()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### .saveAsTextFile(...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As the name suggests, the `.saveAsTextFile()` the RDD and saves it to text files: each partition to a separate file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "data_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To read it back, you need to parse it back as, as before, all the rows are treated as strings."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)]"
      ]
     },
     "execution_count": 31,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "def parseInput(row):\n",
    "    import re\n",
    "    \n",
    "    pattern = re.compile(r'\\(\\'([a-z])\\', ([0-9])\\)')\n",
    "    row_split = pattern.split(row)\n",
    "    \n",
    "    return (row_split[1], int(row_split[2]))\n",
    "    \n",
    "data_key_reread = sc \\\n",
    "    .textFile('/Users/drabast/Documents/PySpark_Data/data_key.txt') \\\n",
    "    .map(parseInput)\n",
    "data_key_reread.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**.foreach(...)**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A method that applies the same function to each element of the RDD in an iterative way."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def f(x): \n",
    "    print(x)\n",
    "\n",
    "data_key.foreach(f)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
